Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from datetime import datetime
from decimal import Decimal
Template
spark = (
SparkSession.builder
.master("local")
.appName("Section 2.11 - Unionizing Multiple Dataframes")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
import os
data_path = "/data/pets.csv"
base_path = os.path.dirname(os.getcwd())
path = base_path + data_path
pets = spark.read.csv(path, header=True)
pets.toPandas()
id | breed_id | nickname | birthday | age | color | |
---|---|---|---|---|---|---|
0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
Unionizing Multiple Dataframes
There are a couple of situations where you would want to perform an union transformation.
Case 1: Collecting Data from Various Sources
When you're collecting data from multiple sources, some point in your spark application you will need to reconcile all the different sources into the same format and work with a single source of truth. This will require you to union
the different datasets together.
Case 2: Perfoming Different Transformations on your Dataset
Sometimes you would like to perform seperate transformations on different parts of your data based on your task. This would involve breaking up your dataset into different parts and working on them individually. Then at some point you might want to stitch they back together, this would again be a union
operation.
Case 1 - union()
(the Wrong Way)
pets_2 = pets.select(
'breed_id',
'id',
'age',
'color',
'birthday',
'nickname'
)
(
pets
.union(pets_2)
.where(F.col('id').isin(1,2))
.toPandas()
)
id | breed_id | nickname | birthday | age | color | |
---|---|---|---|---|---|---|
0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
2 | 1 | 1 | 5 | brown | 2014-11-22 12:30:31 | King |
3 | 1 | 3 | 15 | None | 2016-11-22 10:05:10 | Chewie |
4 | 2 | 3 | 17 | white | 2018-11-22 10:05:10 | Maple |
Case 1 - Another Wrong Way
pets_3 = pets.select(
'*',
'*'
)
pets_3.show()
(
pets
.union(pets_3)
.where(F.col('id').isin(1,2))
.toPandas()
)
+---+--------+--------+-------------------+---+-----+---+--------+--------+-------------------+---+-----+
| id|breed_id|nickname| birthday|age|color| id|breed_id|nickname| birthday|age|color|
+---+--------+--------+-------------------+---+-----+---+--------+--------+-------------------+---+-----+
| 1| 1| King|2014-11-22 12:30:31| 5|brown| 1| 1| King|2014-11-22 12:30:31| 5|brown|
| 2| 3| Argus|2016-11-22 10:05:10| 10| null| 2| 3| Argus|2016-11-22 10:05:10| 10| null|
| 3| 1| Chewie|2016-11-22 10:05:10| 15| null| 3| 1| Chewie|2016-11-22 10:05:10| 15| null|
| 3| 2| Maple|2018-11-22 10:05:10| 17|white| 3| 2| Maple|2018-11-22 10:05:10| 17|white|
+---+--------+--------+-------------------+---+-----+---+--------+--------+-------------------+---+-----+
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-5-c8157f574918> in <module>()
8 (
9 pets
---> 10 .union(pets_3)
11 .where(F.col('id').isin(1,2))
12 .toPandas()
/usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.pyc in union(self, other)
1336 Also as standard in SQL, this function resolves columns by position (not by name).
1337 """
-> 1338 return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
1339
1340 @since(1.3)
/usr/local/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.pyc in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u"Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 12 columns;;\n'Union\n:- Relation[id#10,breed_id#11,nickname#12,birthday#13,age#14,color#15] csv\n+- Project [id#10, breed_id#11, nickname#12, birthday#13, age#14, color#15, id#10, breed_id#11, nickname#12, birthday#13, age#14, color#15]\n +- Relation[id#10,breed_id#11,nickname#12,birthday#13,age#14,color#15] csv\n"
What Happened?
This actually worked out quite nicely, I forgot this was the case actually. Spark will only allow you to union df
that have the exact number of columns and where the column datatypes are exactly the same.
Case 1
Because we infered the schema and datatypes from the csv file it was able to union the 2 dataframes, but the results doesn't make sense at all; The columns don't match up.
Case 2
We created a new dataframe with twice the numnber of columns and tried to union it with the original df
, spark threw an error as it doesn't know what to do when the number of columns don't match up.
Case 2 - union()
(the Right Way)
(
pets
.union(pets_2.select(pets.columns))
.union(pets_3.select(pets.columns))
.toPandas()
)
id | breed_id | nickname | birthday | age | color | |
---|---|---|---|---|---|---|
0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
4 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
5 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
6 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
7 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
8 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
9 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
10 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
11 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
What Happened?
The columns match perfectly! How? For each of the new df
that you would like to union with the original df
you will select
the column from the original df
during the union. This will:
- Guarantees the ordering of the columns, as a
select
will select the columns in order of which they are listed in. - Guarantees that only the columns of the original
df
is selected, from the previous sections, we know thatselect
will only the specified columns.
Summary
- Always always be careful when you are
union
ingdf
together. - When you
union
df
s together you should ensure:- The number of columns are the same.
- The columns are the exact same.
- The columns are in the same order.